package yuekao1.util;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import yuekao1.entity.Tm4_1;
import yuekao1.entity.Tm4_2;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class ClickHouseUtil  extends RichSinkFunction<Tm4_2> {
    Connection conn = null;
    PreparedStatement ps = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        //1.加载驱动类
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        //2.得到连接对象
        conn = DriverManager.getConnection("jdbc:clickhouse://hadoop-single:8123", "default", "");
    }

    @Override
    public void close() throws Exception {
        ps.close();
        conn.close();
    }

    @Override
    public void invoke(Tm4_2 value, Context context) throws Exception {
        ps=conn.prepareStatement("insert into yklx_1 values (?,?,?,?,?,?,?,?)");
        ps.setObject(1,value.getSt());
        ps.setObject(2,value.getEd());
        ps.setObject(3,value.getSize());
        ps.setObject(4,value.getSumprice());
        ps.setObject(5,value.getCount());
        ps.setObject(6,value.getName());
        ps.setObject(7,value.getSkuname());
        ps.setObject(8,value.getUsername());
        ps.execute();
    }
}
